{
  "nbformat": 4,
  "nbformat_minor": 0,
  "metadata": {
    "colab": {
      "provenance": []
    },
    "kernelspec": {
      "name": "python3",
      "display_name": "Python 3"
    },
    "language_info": {
      "name": "python"
    }
  },
  "cells": [
    {
      "cell_type": "code",
      "source": [
        "# Install libraries\n",
        "!pip install ultralytics\n",
        "!pip install ffmpeg-python"
      ],
      "metadata": {
        "id": "WJTaMsEd09Vt"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "# Download model and video.\n",
        "!gdown 1qcr9DbgsX3ryrz2uU8w4Xm3cOrRywXqb\n",
        "!wget https://github.com/iioSnail/pytorch_deep_learning_examples/raw/refs/heads/main/asserts/mp4/kunkun.mp4"
      ],
      "metadata": {
        "id": "eXGr2Fl1Wpdc"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "x3QIKG890rxR"
      },
      "outputs": [],
      "source": [
        "import ffmpeg\n",
        "import cv2\n",
        "from numpy import ndarray\n",
        "from ultralytics import YOLO\n",
        "from tqdm import tqdm"
      ]
    },
    {
      "cell_type": "code",
      "source": [
        "# Apply mosaic to an image\n",
        "def mosaic_image(model, image:ndarray, mosaic_scale = 10) -> ndarray:\n",
        "    results = model(image, verbose=False)\n",
        "    results[0].boxes\n",
        "\n",
        "    boxes = results[0].boxes.xyxy\n",
        "    for i in range(len(boxes)):\n",
        "        x1, y1, x2, y2 = boxes[i].int()\n",
        "        roi = image[y1:y2, x1:x2]\n",
        "\n",
        "        h, w = roi.shape[:2]\n",
        "        small_roi = cv2.resize(roi, (w // mosaic_scale, h // mosaic_scale), interpolation=cv2.INTER_LINEAR)\n",
        "        mosaic_roi = cv2.resize(small_roi, (w, h), interpolation=cv2.INTER_NEAREST)\n",
        "        image[y1:y2, x1:x2] = mosaic_roi\n",
        "\n",
        "    return image"
      ],
      "metadata": {
        "id": "ys1gOBSBWY6C"
      },
      "execution_count": 4,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "# Define filepaths.\n",
        "input_video = \"kunkun.mp4\"\n",
        "tmp_audio = \"tmp.wav\"\n",
        "tmp_video = \"tmp_kunkun.mp4\"\n",
        "output_video = \"mosaic_kunkun.mp4\"\n",
        "\n",
        "model = YOLO(\"yolov8n-face.pt\")"
      ],
      "metadata": {
        "id": "js-8koFq04mb"
      },
      "execution_count": 5,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "# Extract audio from the video.\n",
        "ffmpeg.input(input_video).output(tmp_audio, format='wav').run(overwrite_output=True)"
      ],
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "Q270pQcA1JiA",
        "outputId": "9669c593-23b2-437b-9701-47845588cca9"
      },
      "execution_count": 6,
      "outputs": [
        {
          "output_type": "execute_result",
          "data": {
            "text/plain": [
              "(None, None)"
            ]
          },
          "metadata": {},
          "execution_count": 6
        }
      ]
    },
    {
      "cell_type": "code",
      "source": [
        "# Play mosaic frame by frame and generate the output video with mosaic.\n",
        "cap = cv2.VideoCapture(input_video)\n",
        "if not cap.isOpened():\n",
        "    print(\"Error: Could not open video file.\")\n",
        "    exit(0)\n",
        "\n",
        "width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))\n",
        "height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))\n",
        "fps = cap.get(cv2.CAP_PROP_FPS)\n",
        "n_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))\n",
        "\n",
        "fourcc = cv2.VideoWriter_fourcc(*'mp4v')\n",
        "out = cv2.VideoWriter(tmp_video, fourcc, fps, (width, height))\n",
        "\n",
        "pro_bar = tqdm(total=n_frames)\n",
        "while True:\n",
        "    ret, frame = cap.read()\n",
        "\n",
        "    if not ret:\n",
        "        break\n",
        "\n",
        "    frame = mosaic_image(model, frame)\n",
        "    out.write(frame)\n",
        "\n",
        "    pro_bar.update(1)\n",
        "\n",
        "cap.release()\n",
        "out.release()\n",
        "pro_bar.close()"
      ],
      "metadata": {
        "id": "WkKbG2uw1mnJ"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "# Merge the video and audio.\n",
        "video_stream = ffmpeg.input(tmp_video)\n",
        "audio_stream = ffmpeg.input(tmp_audio)\n",
        "ffmpeg.output(video_stream, audio_stream, output_video, vcodec=\"copy\", acodec='aac').run(overwrite_output=True)"
      ],
      "metadata": {
        "id": "GiYXjgvQ1p8F"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "# Show the result video.\n",
        "from IPython.display import HTML\n",
        "from base64 import b64encode\n",
        "import os\n",
        "\n",
        "# Compressed video path\n",
        "compressed_path = \"./compressed.mp4\"\n",
        "os.system(f\"ffmpeg -i {output_video} -vcodec libx264 {compressed_path}\")\n",
        "\n",
        "# Show video\n",
        "mp4 = open(compressed_path,'rb').read()\n",
        "data_url = \"data:video/mp4;base64,\" + b64encode(mp4).decode()\n",
        "HTML(\"\"\"\n",
        "<video width=400 controls>\n",
        "      <source src=\"%s\" type=\"video/mp4\">\n",
        "</video>\n",
        "\"\"\" % data_url)"
      ],
      "metadata": {
        "id": "AabemWblYA50"
      },
      "execution_count": null,
      "outputs": []
    }
  ]
}